#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <stdarg.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "sysy_lib.h"
#include "cache.h"
#include "get_version.h"
#include "cluster.h"
//#include "metadata.h"
#include "lich_md.h"
#include "configure.h"
#include "job_dock.h"
#include "chunk.h"
#include "main_loop.h"
#include "ynet_rpc.h"
#include "../controller/volume_ctl.h"
#include "../controller/stor_ctl.h"
#include "md_root.h"
#include "stor_rpc.h"
#include "vnode.h"
#include "volume.h"
#include "md_map.h"
#include "net_global.h"
#include "bh.h"
#include "ylog.h"
#include "dbg.h"

/*
struct of file
  ------------------------------------------------------------------
  | file head (first chunk) | chunk | chunk |
  ------------------------------------------------------------------
*/


typedef struct {
        io_t io;
        buffer_t *buf;
        func1_t func;
        void *arg;
        char pool[MAX_NAME_LEN];
} volume_async_ctx_t;

typedef vnode_entry_t entry_t;

#define MODE_MAX 01777
typedef struct {
        chkid_t chkid;
        nid_t location;
        uint64_t taskid;
} move_arg_t;

#define MOVE_PARALLELS 100

//static int __volume_try_create_chkinfo(const fileid_t *id, const chunk_io_t *ios, int count);

int volume_init()
{
        int ret;

        ret = vnode_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __volume_truncate(entry_t *ent, uint64_t size)
{
        int ret;
        setattr_t setattr;
        fileinfo_t fileinfo;

        memset(&setattr, 0x0, sizeof(setattr));
#if LSV
        setattr.size.set_it = SET_LOGICAL_SIZE;
#else
        setattr.size.set_it = 1;
#endif
        setattr.size.size = size;
        ret = md_setattr(&ent->id, &fileinfo, &setattr);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ent->fileinfo = fileinfo;

        return 0;
err_ret:
        return ret;
}

int volume_truncate(const char *pool, const fileid_t *id, uint64_t size)
{
        int ret;
        mcache_entry_t *cent;

        if (id->type != __VOLUME_CHUNK__) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("truncate "CHKID_FORMAT" size %llu\n", CHKID_ARG(id), (LLU)size);

        ret = vnode_get(&cent, pool, id, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __volume_truncate(cent->value, size);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        mcache_unlock(cent);
        vnode_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        vnode_release(cent);
err_ret:
        return _errno(ret);
}

int IO_FUNC volume_read(const char *pool, const io_t *io, buffer_t *buf, nid_t *_nid)
{
        int ret;
        nid_t nid;

#if 0
        if (size > LICH_CHUNK_SPLIT) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }
#endif

        if (unlikely(io->id.type != __VOLUME_CHUNK__)) {
                DWARN("read "CHKID_FORMAT" size %llu offset %llu\n",
                      CHKID_ARG(&io->id), (LLU)io->size, (LLU)io->offset);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("read file "CHKID_FORMAT" size %llu offset %llu\n",
             CHKID_ARG(&io->id), (LLU)io->size, (LLU)io->offset);

        if (_nid) {
                nid = *_nid;
        } else {
                ret = vnode_location(pool, &io->id, &nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (net_islocal(&nid)) {
                ret = volume_ctl_read(io, buf, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_read(&nid, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, &io->id);
        }
        return _errno(ret);
}

int volume_write(const char *pool, const io_t *io, const buffer_t *_buf, nid_t *_nid)
{
        int ret;
        nid_t nid;

        if (unlikely(io->id.type != __VOLUME_CHUNK__)) {
                DWARN("write "CHKID_FORMAT" size %llu offset %llu\n",
                      CHKID_ARG(&io->id), (LLU)io->size, (LLU)io->offset);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("write file "CHKID_FORMAT" size %llu offset %llu\n",
             CHKID_ARG(&io->id), (LLU)io->size, (LLU)io->offset);

        if (_nid) {
                nid = *_nid;
        } else {
                ret = vnode_location(pool, &io->id, &nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (net_islocal(&nid)) {
                ret = volume_ctl_write(io, _buf, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_write(&nid, io, _buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, &io->id);
        }
        return _errno(ret);
}

int volume_pwrite(const char *pool, const fileid_t *id, const char *_buf, size_t size, off_t offset)
{
        int ret;
        buffer_t buf;
        io_t io;

        mbuffer_init(&buf, 0);
        ret = mbuffer_copy(&buf, _buf, size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io_init(&io, id, NULL, offset, size, 0);
        ret = volume_write(pool, &io, &buf, NULL);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        mbuffer_free(&buf);

        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return _errno(ret);
}

int volume_pread(const char *pool, const fileid_t *id, char *_buf, int size, off_t offset)
{
        int ret;
        buffer_t buf;
        io_t io;

        mbuffer_init(&buf, 0);

        io_init(&io, id, NULL, offset, size, 0);
        ret = volume_read(pool, &io, &buf, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        size = buf.len;
        mbuffer_get(&buf, _buf, size);
        mbuffer_free(&buf);

        return size;
err_ret:
        return 0 - _errno(ret);
}

int volume_unmap(const char *pool, const io_t *io)
{
        int ret;
        nid_t nid;

        if (unlikely(io->id.type != __VOLUME_CHUNK__)) {
                DWARN("write "CHKID_FORMAT" size %llu offset %llu\n",
                      CHKID_ARG(&io->id), (LLU)io->size, (LLU)io->offset);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DINFO("unmap "CHKID_FORMAT" size %llu offset %llu\n",
             CHKID_ARG(&io->id), (LLU)io->size, (LLU)io->offset);

        ret = vnode_location(pool, &io->id, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(&nid)) {
                ret = volume_ctl_unmap(io, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_unmap(&nid, io);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, &io->id);
        }
        return _errno(ret);
}

/**
 * 任务调度给pool controller
 *
 * @param chkid
 * @param name
 * @return
 */
int volume_cleanup_bh(const chkid_t *chkid, const char *name)
{
        int ret, retry = 0;
        nid_t nid;

        if (chkid->type != __POOL_CHUNK__) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

retry:
        ret = md_map_getsrv(chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocalcall(&nid)) {
                ret = stor_ctl_cleanup_bh(chkid, name);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(chkid, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_cleanup_bh(&nid, chkid, name);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(chkid, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return _errno(ret);
}

int volume_rollback_bh(const chkid_t *id, const char *name)
{
        int ret, retry = 0;
        nid_t nid;
        chkid_t chkid;

        if (id->type != __POOL_CHUNK__) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        str2chkid(&chkid, name);
        
retry:
        ret = md_map_getsrv(&chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocalcall(&nid)) {
                ret = stor_ctl_rollback_bh(&chkid);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(id, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_rollback_bh(&nid, &chkid);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(id, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return _errno(ret);
}

int volume_flat_bh(const chkid_t *id, const char *name)
{
        int ret, retry = 0;
        nid_t nid;
        chkid_t chkid;

        if (id->type != __POOL_CHUNK__) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        str2chkid(&chkid, name);
retry:
        ret = md_map_getsrv(&chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(&nid)) {
                ret = stor_ctl_flat_bh(&chkid);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(id, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_flat_bh(&nid, &chkid);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(id, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return _errno(ret);
}

int volume_rmsnap_bh(const chkid_t *id, const char *name)
{
        int ret, count, retry = 0;
        nid_t nid;
        chkid_t chkid;
        char tmp[MAX_NAME_LEN], *tmp2[2];

        if (id->type != __POOL_CHUNK__) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        strcpy(tmp, name);
        count = 2;
        _str_split(tmp, ':', tmp2, &count);

        // 卷名+改名后的快照名
        str2chkid(&chkid, tmp2[0]);

retry:
        ret = md_map_getsrv(&chkid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("chunk "CHKID_FORMAT" @ %s remove snap %s\n",
              CHKID_ARG(&chkid), network_rname(&nid), tmp2[1]);

        if (net_islocalcall(&nid)) {
                ret = stor_ctl_rmsnap_bh(&chkid, tmp2[1]);
                if (ret) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(id, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_rmsnap_bh(&nid, &chkid, tmp2[1]);
                if (ret) {
                        if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                                md_map_drop(id, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return _errno(ret);
}

static void __volume_read_async(void *arg)
{
        int ret;
        volume_async_ctx_t *ctx = arg;

        ret = volume_read(ctx->pool, &ctx->io, ctx->buf, NULL);

        ctx->func(ctx->arg, &ret);
        yfree((void **)&ctx);
}

void volume_read_async(const char *pool, const fileid_t *id, buffer_t *buf, size_t size,
                       off_t offset, func1_t func, void *arg)
{
        int ret;
        volume_async_ctx_t *ctx;

        ret = ymalloc((void **)&ctx, sizeof(*ctx));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io_init(&ctx->io, id, NULL, offset, size, 0);
        ctx->buf = buf;
        ctx->func = func;
        ctx->arg = arg;
        strcpy(ctx->pool, pool);

        ret = main_loop_request(__volume_read_async, ctx, "volume_read");
        if (unlikely(ret))
                GOTO(err_free, ret);

        return;
err_free:
        yfree((void **)&ctx);
err_ret:
        func(arg, &ret);
        return;
}

static void __volume_write_async(void *arg)
{
        int ret;
        volume_async_ctx_t *ctx = arg;

        ret = volume_write(ctx->pool, &ctx->io, ctx->buf, NULL);

        ctx->func(ctx->arg, &ret);
        yfree((void **)&ctx);
}

void volume_write_async(const char *pool, const fileid_t *id, const buffer_t *buf, size_t size,
                      off_t offset, func1_t func, void *arg)
{
        int ret;
        volume_async_ctx_t *ctx;

        ret = ymalloc((void **)&ctx, sizeof(*ctx));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io_init(&ctx->io, id, NULL, offset, size, 0);
        ctx->buf = (void *)buf;
        ctx->func = func;
        ctx->arg = arg;
        strcpy(ctx->pool, pool);

        ret = main_loop_request(__volume_write_async, ctx, "volume_write");
        if (unlikely(ret))
                GOTO(err_free, ret);

        return;
err_free:
        yfree((void **)&ctx);
err_ret:
        func(arg, &ret);
        return;
}

/**
 * @brief
 *
 * @param pool
 * @param parent
 * @param fileid
 * @param buf
 * @param size
 * @param offset
 * @param fillzero 1 | 0
 * @return 如无，返回全0的buf，ret=0
 */
int volume_snapshot_read(const char *pool, const fileid_t *parent, const fileid_t *fileid,
                         buffer_t *buf, size_t size, off_t offset, BOOL fillzero)
{
        int ret;
        nid_t nid;
        io_t io;

        if (parent->type != __VOLUME_CHUNK__) {
                DWARN("read "CHKID_FORMAT" size %ju offset %ju\n", CHKID_ARG(parent), size, offset);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("read file "CHKID_FORMAT" size %ju offset %ju\n", CHKID_ARG(parent), size, offset);

        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io_init(&io, fileid, NULL, offset, size, fillzero ? 0 : __FILE_ATTR_NOFILL__);
        if (net_islocal(&nid)) {
                ret = volume_ctl_snapshot_read(parent, &io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_read(&nid, parent, &io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, parent);
        }
        return _errno(ret);
}

int volume_snapshot_flat(const char *pool, const fileid_t *id, int idx, int force)
{
        int ret;
        nid_t nid;

        if (unlikely(id->type != __VOLUME_CHUNK__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = vnode_location(pool, id, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocalcall(&nid)) {
                ret = volume_ctl_snapshot_flat(id, idx, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_flat(&nid, id, idx, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, id);
        }
        return _errno(ret);
}

int volume_snapshot_diff(const char *pool, const fileid_t *parent, const fileid_t *fileid,
                const fileid_t *snapdst, buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        nid_t nid;

        if (parent->type != __VOLUME_CHUNK__) {
                DWARN("read "CHKID_FORMAT" size %llu offset %llu\n",
                      CHKID_ARG(parent), (LLU)size, (LLU)offset);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("read file "CHKID_FORMAT" size %llu offset %llu\n",
             CHKID_ARG(parent), (LLU)size, (LLU)offset);

        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocalcall(&nid)) {
                ret = volume_ctl_snapshot_diff(parent, fileid, snapdst, buf, size, offset);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_diff(&nid, parent, fileid, snapdst, buf, size, offset);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, parent);
        }
        return _errno(ret);
}
